“Books are a narcotic.”
― Franz Kafka
突然想起了恐龍書
Kafka Connect 是一個可靠的、可隨情境擴增或是縮減的資料傳輸工具,用來處理 Kafka 跟其他資料儲存系統間的資料傳輸,透過定義 connector 可以輕易地從 Kafka 傳入和傳出大量資料。
Kafka Connect 可以消化掉整個資料庫、搜集你app server的資料集,放入到 Kafka topics 中,確保資料在低延遲下可以被取用。匯出方面,Connector 可以將 Kafka topics 的資料傳送給 Elasticsearch 或是離線分析用的Hadoop系統。
Kafka Connect 可以分為匯入的 source connector 跟匯出 sink connector,目前可以支援的範圍很廣,你可以將 Microsoft SQL Server、MQTT、Java JDBC、IMB MQ、salesforce、JSON檔案、poster Sql、CSV檔案、Mysql...等資料透過 source connector 匯入 Kafka topic,再透過 sink connector 將資料匯出到 Google BigQuery、hadoop、Amazon S3、elasticsearch、snowflake、ORACLE、各類DB...等。
坐而言不如起而行,今天會帶大家簡單實作一個小練習,我們將會透過 Kafka Connect 讓兩個資料庫資料對接,做到類似ETL的功能,主要分為三個步驟:
首先,在本地的 Mysql 創建一個來源資料庫跟目標資料庫
mysql> create database `source_database` default character set utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.00 sec)
mysql> create database `target_database` default character set utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.01 sec)
在來源資料庫 source_database 和 target_database 各創建一張表當作資料來源
mysql> use source_database;
Database changed
mysql> CREATE TABLE `source_users` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`username` VARCHAR(20) NOT NULL,
`nickname` VARCHAR(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.05 sec)
mysql> use target_database;
Database changed
mysql> CREATE TABLE `target_users` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`username` VARCHAR(20) NOT NULL,
`nickname` VARCHAR(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.03 sec)
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.1/confluentinc-kafka-connect-jdbc-10.2.1.zip
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
unzip confluentinc-kafka-connect-jdbc-10.2.1.zip
mkdir -p kafkaConnect/lib
mv confluentinc-kafka-connect-jdbc-10.2.1 kafkaConnect/
mv mysql-connector-java-8.0.20.jar kafkaConnect/lib/
我們今天是採用 connector 的 distributed 模式,另外還有 standalone 模式,官方建議線上採用 distributed 的模式,因為可擴增性、可用性和管理等各方面都更佳
connect-distributed.properties
bootstrap.servers=127.0.0.1:9092 # 指到 Broker Server 的 IP 位子
group.id=connect-cluster
rest.port=8083 # REST 介面監聽的 port,預設是8083,順便一題如果你是用 connect 的 standalone 模式,預設 port 是 8084。
plugin.path=/usr/local/etc/kafkaConnect # 剛剛創建資料夾的絕對路徑
connect-distributed /usr/local/etc/kafka/connect-distributed.properties
$ curl 'http://127.0.0.1:8083/connector-plugins'
[
{
"class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"type":"sink",
"version":"10.2.1"
},
{
"class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"type":"source",
"version":"10.2.1"
},
{
"class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
"type":"sink",
"version":"2.8.0"
},
{
"class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"type":"source",
"version":"2.8.0"
},
{
"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type":"source",
"version":"1"
},
{
"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type":"source",
"version":"1"
},
{
"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type":"source",
"version":"1"
}
]